//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef DATASTORM_DATASTORM_H
#define DATASTORM_DATASTORM_H

#include "Config.h"
#include "DataStorm/Sample.h"
#include "InternalI.h"
#include "InternalT.h"
#include "Node.h"
#include "Types.h"

#include <regex>

namespace DataStorm
{

    /**
     * A sample provides information about a data element update.
     *
     * The Sample template provides access to the key, value as well as additional information such as the event,
     * timestamp, update tag. Samples are generated and published by writers and received by readers.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string> class Sample
    {
    public:
        /**
         * The type of the sample key.
         */
        using KeyType = Key;

        /**
         * The type of the sample value.
         */
        using ValueType = Value;

        /**
         * The type of the update tag. The update tag type defaults to string if it's not explicitly specified
         * with the Sample template parameters.
         */
        using UpdateTagType = UpdateTag;

        /**
         * The event associated with the sample.
         *
         * @return The sample event.
         */
        SampleEvent getEvent() const noexcept;

        /**
         * The key of the sample.
         *
         * @return The sample key.
         */
        const Key& getKey() const noexcept;

        /**
         * The value of the sample.
         *
         * Depending on the sample event, the sample value might not always be available. It's the case if the
         * sample event is Remove where this method will return a default value.
         *
         * @return The sample value.
         */
        const Value& getValue() const noexcept;

        /**
         * The update tag for the partial update.
         *
         * This method should only be called if the sample event is PartialUpdate.
         *
         * @return The update tag.
         */
        UpdateTag getUpdateTag() const noexcept;

        /**
         * The timestamp of the sample.
         *
         * The timestamp is generated by the writer and corresponds to the time of sending.
         *
         * @return The timestamp.
         */
        std::chrono::time_point<std::chrono::system_clock> getTimeStamp() const noexcept;

        /**
         * The origin of the sample.
         *
         * The origin of the sample identifies uniquely on the node the writer that created the sample. It's the
         * name of the writer if a name was explicitly provided on creation of the writer. Otherwise, if no name
         * was provided, an unique identifier is generated by DataStorm.
         *
         * @return The origin of the sample.
         */
        std::string getOrigin() const noexcept;

        /**
         * Get the session identifier of the session that received this sample.
         *
         * This session identifier can be used to retrieve the Ice connection with the node.
         *
         * @return The session identifier.
         */
        std::string getSession() const noexcept;

        /** @private */
        Sample(const std::shared_ptr<DataStormI::Sample>&) noexcept;

    private:
        std::shared_ptr<DataStormI::SampleT<Key, Value, UpdateTag>> _impl;
    };

    /**
     * Convert the given sample type to a string and add it to the stream.
     *
     * @param os The output stream
     * @param sampleType The sample type to add to the stream
     * @return The output stream
     */
    inline std::ostream& operator<<(std::ostream& os, SampleEvent sampleType)
    {
        switch (sampleType)
        {
            case SampleEvent::Add:
                os << "Add";
                break;
            case SampleEvent::Update:
                os << "Update";
                break;
            case SampleEvent::Remove:
                os << "Remove";
                break;
            case SampleEvent::PartialUpdate:
                os << "PartialUpdate";
                break;
            default:
                os << static_cast<int>(sampleType);
                break;
        }
        return os;
    }

    /**
     * Convert the given sample type vector to a string and add it to the stream.
     *
     * @param os The output stream
     * @param types The sample type vector to add to the stream
     * @return The output stream
     */
    inline std::ostream& operator<<(std::ostream& os, const std::vector<SampleEvent>& types)
    {
        os << "[";
        for (auto p = types.begin(); p != types.end(); ++p)
        {
            if (p != types.begin())
            {
                os << ',';
            }
            os << *p;
        }
        os << "]";
        return os;
    }

    /**
     * Convert the given sample to a string and add it to the stream. The implementation outputs the sample value.
     *
     * @param os The output stream
     * @param sample The sample to add to the stream
     * @return The output stream
     */
    template<typename K, typename V, typename U>
    std::ostream& operator<<(std::ostream& os, const Sample<K, V, U>& sample)
    {
        os << sample.getValue();
        return os;
    }

    /**
     * The Reader class is used to retrieve samples for a data element.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag> class Reader
    {
    public:
        /**
         * The key type.
         */
        using KeyType = Key;

        /**
         * The value type.
         */
        using ValueType = Value;

        /**
         * Transfers the given reader to this reader.
         *
         * @param reader The reader.
         **/
        Reader(Reader&& reader) noexcept;

        /**
         * Destruct the reader. The destruction of the reader disconnects the reader from the writers.
         */
        ~Reader();

        /**
         * Move assignment operator
         *
         * @param reader The reader.
         **/
        Reader& operator=(Reader&& reader) noexcept;

        /**
         * Indicates whether or not writers are online.
         *
         * @return True if writers are connected, false otherwise.
         */
        bool hasWriters() const noexcept;

        /**
         * Wait for given number of writers to be online. The node shutdown will cause this method to raise
         * NodeShutdownException.
         *
         * @param count The number of writers to wait.
         */
        void waitForWriters(unsigned int count = 1) const;

        /**
         * Wait for readers to be offline. The node shutdown will cause this method to raise
         * NodeShutdownException.
         */
        void waitForNoWriters() const;

        /**
         * Get the connected writers.
         *
         * @return The names of the connected writers.
         */
        std::vector<std::string> getConnectedWriters() const noexcept;

        /**
         * Get the keys for which writers are connected to this reader.
         *
         * @return The keys for which we have writers connected.
         **/
        std::vector<Key> getConnectedKeys() const noexcept;

        /**
         * Returns all the unread samples.
         *
         * @return The unread samples.
         */
        std::vector<Sample<Key, Value, UpdateTag>> getAllUnread() noexcept;

        /**
         * Wait for given number of unread samples to be available. The node shutdown will cause this method to
         * raise NodeShutdownException.
         */
        void waitForUnread(unsigned int count = 1) const;

        /**
         * Returns wether or not unread samples are available.
         *
         * @return True if there unread samples are queued, false otherwise.
         */
        bool hasUnread() const noexcept;

        /**
         * Returns the next unread sample. The node shutdown will cause this method to raise
         * NodeShutdownException.
         *
         * @return The unread sample.
         */
        Sample<Key, Value, UpdateTag> getNextUnread();

        /**
         * Calls the given functions to provide the initial set of connected keys and when a key is added or
         * removed from the set of connected keys. If callback functions are already set, they will be replaced.
         *
         * The connected keys represent the set of keys for which writers are connected to this reader.
         *
         * The init callback is always called after this method returns to provide the initial set of connected
         * keys. The update callback is called when new keys are added or removed from the set of connected keys.
         *
         * @param init The function to call with the initial set of connected keys.
         * @param update The function to call when a key is added or removed from the set.
         **/
        void onConnectedKeys(
            std::function<void(std::vector<Key>)> init,
            std::function<void(CallbackReason, Key)> update) noexcept;

        /**
         * Calls the given functions to provide the initial set of connected writers and when a new writer
         * connects or disconnects. If callback functions are already set, they will be replaced.
         *
         * The init callback is always called after this method returns to provide the initial set of connected
         * writers. The update callback is called when new writers connect or disconnect.
         *
         * @param init The function to call with the initial set of connected writers.
         * @param update The function to call when a new writer connects or disconnects.
         **/
        void onConnectedWriters(
            std::function<void(std::vector<std::string>)> init,
            std::function<void(CallbackReason, std::string)> update) noexcept;

        /**
         * Calls the given function to provide the initial set of unread samples and when new samples are queued.
         *
         * If a function is already set, it will be replaced.
         *
         * The init callback is always called after this method returns to provide the initial set of unread
         * samples. The queue callback is called when a new sample is received.
         *
         * @param init The function to call with the initial set of unread samples.
         * @param queue The function to call when a new sample is received.
         **/
        void onSamples(
            std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
            std::function<void(Sample<Key, Value, UpdateTag>)> queue) noexcept;

    protected:
        /** @private */
        Reader(const std::shared_ptr<DataStormI::DataReader>& impl) noexcept : _impl(impl) {}

        /** @private */
        std::shared_ptr<DataStormI::DataReader> _impl;
    };

    /**
     * The writer class is used to write samples for a data element.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag> class Writer
    {
    public:
        /**
         * The key type.
         */
        using KeyType = Key;

        /**
         * The value type.
         */
        using ValueType = Value;

        /**
         * Transfers the given writer to this writer.
         *
         * @param writer The writer.
         **/
        Writer(Writer&& writer) noexcept;

        /**
         * Move assignment operator.
         *
         * @param writer The writer.
         **/
        Writer& operator=(Writer&& writer) noexcept;

        /**
         * Destruct the writer. The destruction of the writer disconnects the writer from the readers.
         */
        ~Writer();

        /**
         * Indicates whether or not readers are online.
         *
         * @return True if readers are connected, false otherwise.
         */
        bool hasReaders() const noexcept;

        /**
         * Wait for given number of readers to be online. The node shutdown will cause this method to raise
         * NodeShutdownException.
         *
         * @param count The number of readers to wait.
         */
        void waitForReaders(unsigned int count = 1) const;

        /**
         * Wait for readers to be offline.  The node shutdown this method to raise NodeShutdownException.
         */
        void waitForNoReaders() const;

        /**
         * Get the connected readers.
         *
         * @return The names of the connected readers.
         */
        std::vector<std::string> getConnectedReaders() const noexcept;

        /**
         * Get the keys for which readers are connected to this writer.
         *
         * @return The keys for which we have writers connected.
         **/
        std::vector<Key> getConnectedKeys() const noexcept;

        /**
         * Get the last written sample. If there's no sample, the std::logic_error exception is raised.
         *
         * @return The last written sample.
         **/
        Sample<Key, Value, UpdateTag> getLast();

        /**
         * Get all the written sample kept in the writer history.
         *
         * @return The sample history.
         **/
        std::vector<Sample<Key, Value, UpdateTag>> getAll() noexcept;

        /**
         * Calls the given functions to provide the initial set of connected keys and when a key is added or
         * removed from the set of connected keys. If callback functions are already set, they will be replaced.
         *
         * The connected keys represent the set of keys for which writers are connected to this reader.
         *
         * The init callback is always called after this method returns to provide the initial set of connected
         * keys. The update callback is called when new keys are added or removed from the set of connected keys.
         *
         * @param init The function to call with the initial set of connected keys.
         * @param update The function to call when a key is added or removed from the set.
         **/
        void onConnectedKeys(
            std::function<void(std::vector<Key>)> init,
            std::function<void(CallbackReason, Key)> update) noexcept;

        /**
         * Calls the given functions to provide the initial set of connected readers and when a new reader
         * connects or disconnects. If callback functions are already set, they will be replaced.
         *
         * The init callback is always called after this method returns to provide the initial set of connected
         * readers. The update callback is called when new readers connect or disconnect.
         *
         * @param init The function to call with the initial set of connected readers.
         * @param update The function to call when a new reader connects or disconnects.
         **/
        void onConnectedReaders(
            std::function<void(std::vector<std::string>)> init,
            std::function<void(CallbackReason, std::string)> update) noexcept;

    protected:
        /** @private */
        Writer(const std::shared_ptr<DataStormI::DataWriter>& impl) noexcept : _impl(impl) {}

        /** @private */
        std::shared_ptr<DataStormI::DataWriter> _impl;
    };

    /**
     * The Topic class.
     *
     * This class allows constructing reader and writer objects. It's also used to setup filter and updater
     * functions.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string> class Topic
    {
    public:
        /**
         * The topic's key type.
         */
        using KeyType = Key;

        /**
         * The topic's value type.
         */
        using ValueType = Value;

        /**
         * The topic's update tag type (defaults to std::string if not specified).
         */
        using UpdateTagType = UpdateTag;

        /**
         * The topic's writer type.
         */
        using WriterType = Writer<Key, Value, UpdateTag>;

        /**
         * The topic's reader type.
         */
        using ReaderType = Reader<Key, Value, UpdateTag>;

        /**
         * The topic's sample type.
         */
        using SampleType = Sample<Key, Value, UpdateTag>;

        /**
         * Construct a new Topic for the topic with the given name.
         *
         * @param node The node.
         * @param name The name of the topic.
         */
        Topic(const Node& node, const std::string& name) noexcept;

        /**
         * Construct a new Topic by taking ownership of the given topic.
         *
         * @param topic The topic to transfer ownership from.
         */
        Topic(Topic&& topic) noexcept
            : _name(std::move(topic._name)),
              _topicFactory(std::move(topic._topicFactory)),
              _keyFactory(std::move(topic._keyFactory)),
              _tagFactory(std::move(topic._tagFactory)),
              _keyFilterFactories(std::move(topic._keyFilterFactories)),
              _sampleFilterFactories(std::move(topic._sampleFilterFactories)),
              _reader(std::move(topic._reader)),
              _writer(std::move(topic._writer)),
              _updaters(std::move(topic._updaters))
        {
        }

        /**
         * Destruct the Topic. This disconnects the topic from peers.
         */
        ~Topic();

        /**
         * Move assignment operator.
         *
         * @param topic The topic.
         **/
        Topic& operator=(Topic&& topic) noexcept;

        /**
         * Indicates whether or not data writers are online.
         *
         * @return True if data writers are connected, false otherwise.
         */
        bool hasWriters() const noexcept;

        /**
         * Wait for given number of data writers to be online. The node shutdown will cause this method to raise
         * NodeShutdownException.
         *
         * @param count The number of date writers to wait.
         */
        void waitForWriters(unsigned int count = 1) const;

        /**
         * Wait for data writers to be offline. The node shutdown will cause this method to raise
         * NodeShutdownException.
         */
        void waitForNoWriters() const;

        /**
         * Set the default configuration used to construct readers.
         *
         * @param config The default writer configuration.
         */
        void setWriterDefaultConfig(const WriterConfig& config) noexcept;

        /**
         * Indicates whether or not data readers are online.
         *
         * @return True if data readers are connected, false otherwise.
         */
        bool hasReaders() const noexcept;

        /**
         * Wait for given number of data readers to be online. The node shutdown will cause this method to raise
         * NodeShutdownException.
         *
         * @param count The number of data readers to wait.
         */
        void waitForReaders(unsigned int count = 1) const;

        /**
         * Wait for data readers to be offline. The node shutdown will cause this method to raise
         * NodeShutdownException.
         */
        void waitForNoReaders() const;

        /**
         * Set the default configuration used to construct readers.
         *
         * @param config The default reader configuration.
         */
        void setReaderDefaultConfig(const ReaderConfig& config) noexcept;

        /**
         * Set an updater function for the given update tag. The function is called when a partial update is
         * received or sent to compute the new value. The function is provided the latest value and the partial
         * update. It should return the new value.
         *
         * @param tag The update tag.
         * @param updater The updater function.
         */
        template<typename UpdateValue>
        void setUpdater(const UpdateTag& tag, std::function<void(Value&, UpdateValue)> updater) noexcept;

        /**
         * Set a key filter factory. The given factory function must return a filter function that returns true if
         * the key matches the filter criteria, false otherwise.
         *
         * @param name The name of the key filter.
         * @param factory The filter factory function.
         */
        template<typename Criteria>
        void setKeyFilter(
            const std::string& name,
            std::function<std::function<bool(const Key&)>(const Criteria&)> factory) noexcept;

        /**
         * Set a sample filter factory. The given factory function must return a filter function that returns true
         * if the sample matches the filter criteria, false otherwise.
         *
         * @param name The name of the sample filter.
         * @param factory The filter factory function.
         */
        template<typename Criteria>
        void setSampleFilter(
            const std::string& name,
            std::function<std::function<bool(const SampleType&)>(const Criteria&)> factory) noexcept;

    private:
        std::shared_ptr<DataStormI::TopicReader> getReader() const;
        std::shared_ptr<DataStormI::TopicWriter> getWriter() const;
        Ice::CommunicatorPtr getCommunicator() const noexcept;

        template<typename, typename, typename> friend class SingleKeyWriter;
        template<typename, typename, typename> friend class MultiKeyWriter;
        template<typename, typename, typename> friend class SingleKeyReader;
        template<typename, typename, typename> friend class MultiKeyReader;
        template<typename, typename, typename> friend class FilteredKeyReader;

        const std::string _name;
        const std::shared_ptr<DataStormI::TopicFactory> _topicFactory;
        const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
        const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;

        const std::shared_ptr<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>> _keyFilterFactories;
        const std::shared_ptr<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>>
            _sampleFilterFactories;

        mutable std::mutex _mutex;
        mutable std::shared_ptr<DataStormI::TopicReader> _reader;
        mutable std::shared_ptr<DataStormI::TopicWriter> _writer;
        mutable std::map<std::shared_ptr<DataStormI::Tag>, DataStormI::Topic::Updater> _updaters;
    };

    /**
     * Filter structure to specify the filter name and criteria value.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename T> struct Filter
    {
        /**
         * Construct a filter structure with the given name and criteria.
         *
         * @param name The filter name
         * @param criteria The criteria
         */
        template<typename TT>
        Filter(const std::string& name, TT&& criteria) noexcept : name(name),
                                                                  criteria(std::forward<TT>(criteria))
        {
        }

        /** The filter name. */
        std::string name;

        /** The filter criteria value. */
        T criteria;
    };

    /**
     * The key reader to read the data element associated with a given key.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class SingleKeyReader : public Reader<Key, Value, UpdateTag>
    {
    public:
        /**
         * Construct a new reader for the given key. The construction of the reader connects the reader to writers
         * with a matching key.
         *
         * @param topic The topic.
         * @param key The key of the data element to read.
         * @param name The optional reader name.
         * @param config The reader configuration.
         */
        SingleKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Key& key,
            const std::string& name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /**
         * Construct a new reader for the given key and sample filter criteria. The construction of the reader
         * connects the reader to writers with a matching key. The writer will only send samples matching the
         * given sample filter criteria to the reader.
         *
         * @param topic The topic.
         * @param key The key of the data element to read.
         * @param sampleFilter The sample filter.
         * @param name The optional reader name.
         * @param config The reader configuration.
         */
        template<typename SampleFilterCriteria>
        SingleKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Key& key,
            const Filter<SampleFilterCriteria>& sampleFilter,
            const std::string& name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /**
         * Transfers the given reader to this reader.
         *
         * @param reader The reader.
         **/
        SingleKeyReader(SingleKeyReader&& reader) noexcept;

        /**
         * Move assignment operator.
         *
         * @param reader The reader.
         **/
        SingleKeyReader& operator=(SingleKeyReader&& reader) noexcept;
    };

    /**
     * The key reader to read the data element associated with a given set of keys.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class MultiKeyReader : public Reader<Key, Value, UpdateTag>
    {
    public:
        /**
         * Construct a new reader for the given keys. The construction of the reader connects the reader to
         * writers with matching keys. If an empty vector of keys is provided, the reader will connect to all the
         * available writers.
         *
         * @param topic The topic.
         * @param keys The keys of the data elements to read.
         * @param name The optional reader name.
         * @param config The reader configuration.
         */
        MultiKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const std::vector<Key>& keys,
            const std::string& name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /**
         * Construct a new reader for the given keys and sample filter criteria. The construction of the reader
         * connects the reader to writers with matching keys. If an empty vector of keys is provided, the reader
         * will connect to all the available writers. The writer will only send samples matching the given sample
         * filter criteria to the reader.
         *
         * @param topic The topic.
         * @param keys The keys of the data elements to read.
         * @param sampleFilter The sample filter.
         * @param name The optional reader name.
         * @param config The reader configuration.
         */
        template<typename SampleFilterCriteria>
        MultiKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const std::vector<Key>& keys,
            const Filter<SampleFilterCriteria>& sampleFilter,
            const std::string& name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /**
         * Transfers the given reader to this reader.
         *
         * @param reader The reader.
         **/
        MultiKeyReader(MultiKeyReader&& reader) noexcept;

        /**
         * Move assignment operator.
         *
         * @param reader The reader.
         **/
        MultiKeyReader& operator=(MultiKeyReader&& reader) noexcept;
    };

    /**
     * Creates a key reader for the given topic and key. This helper method deduces the topic Key, Value and
     * UpdateTag types from the topic argument.
     *
     * @param topic The topic.
     * @param key The key.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename K, typename V, typename UT>
    SingleKeyReader<K, V, UT> makeSingleKeyReader(
        const Topic<K, V, UT>& topic,
        const typename Topic<K, V, UT>::KeyType& key,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return SingleKeyReader<K, V, UT>(topic, key, name, config);
    }

    /**
     * Creates a key reader for the given topic, key and sample filter. This helper method deduces the topic Key
     * and Value types from the topic argument.
     *
     * @param topic The topic.
     * @param key The key.
     * @param sampleFilter The sample filter.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename SFC, typename K, typename V, typename UT>
    SingleKeyReader<K, V, UT> makeSingleKeyReader(
        const Topic<K, V, UT>& topic,
        const typename Topic<K, V, UT>::KeyType& key,
        const Filter<SFC>& sampleFilter,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return SingleKeyReader<K, V, UT>(topic, key, sampleFilter, name, config);
    }

    /**
     * Creates a multi-key reader for the given topic. This helper method deduces the topic Key, Value and
     * UpdateTag types from the topic argument.
     *
     * The reader will only receive samples for the given set of keys.
     *
     * @param topic The topic.
     * @param keys The keys.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename K, typename V, typename UT>
    MultiKeyReader<K, V, UT> makeMultiKeyReader(
        const Topic<K, V, UT>& topic,
        const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, keys, name, config);
    }

    /**
     * Creates a multi-key reader for the given topic, keys and sample filter. This helper method deduces the
     * topic Key and Value types from the topic argument.
     *
     * The reader will only receive samples for the given set of keys.
     *
     * @param topic The topic.
     * @param keys The keys.
     * @param sampleFilter The sample filter.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename SFC, typename K, typename V, typename UT>
    MultiKeyReader<K, V, UT> makeMultiKeyReader(
        const Topic<K, V, UT>& topic,
        const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
        const Filter<SFC>& sampleFilter,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, keys, sampleFilter, name, config);
    }

    /**
     * Creates an any-key reader for the given topic. This helper method deduces the topic Key, Value and
     * UpdateTag types from the topic argument.
     *
     * The reader will receive samples for any keys from the topic.
     *
     * @param topic The topic.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename K, typename V, typename UT>
    MultiKeyReader<K, V, UT> makeAnyKeyReader(
        const Topic<K, V, UT>& topic,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, {}, name, config);
    }

    /**
     * Creates an any-key reader for the given topic and sample filter. This helper method deduces the topic Key
     * and Value types from the topic argument.
     *
     * The reader will receive samples for the keys from the topic.
     *
     * @param topic The topic.
     * @param sampleFilter The sample filter.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename SFC, typename K, typename V, typename UT>
    MultiKeyReader<K, V, UT> makeAnyKeyReader(
        const Topic<K, V, UT>& topic,
        const Filter<SFC>& sampleFilter,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, {}, sampleFilter, name, config);
    }

    /**
     * The filtered reader to read data elements whose key match a given filter.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class FilteredKeyReader : public Reader<Key, Value, UpdateTag>
    {
    public:
        /**
         * Construct a new reader for the given key filter. The construction of the reader connects the reader to
         * writers whose key matches the key filter criteria.
         *
         * If the key filter is not registered with the topic or the filter invalid, std::invalid_argument is
         * raised.
         *
         * @param topic The topic.
         * @param keyFilter The key filter.
         * @param name The optional reader name.
         * @param config The reader configuration.
         */
        template<typename KeyFilterCriteria>
        FilteredKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Filter<KeyFilterCriteria>& keyFilter,
            const std::string& name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /**
         * Construct a new reader for the given key filter and sample filter criteria. The construction of the
         * reader connects the reader to writers whose key matches the key filter criteria.
         *
         * If the key filter is not registered with the topic or the filter invalid, std::invalid_argument is
         * raised.
         *
         * @param topic The topic.
         * @param keyFilter The key filter.
         * @param sampleFilter The sample filter.
         * @param name The optional reader name.
         * @param config The reader configuration.
         */
        template<typename KeyFilterCriteria, typename SampleFilterCriteria>
        FilteredKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Filter<KeyFilterCriteria>& keyFilter,
            const Filter<SampleFilterCriteria>& sampleFilter,
            const std::string& name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /**
         * Transfers the given reader to this reader.
         *
         * @param reader The reader.
         **/
        FilteredKeyReader(FilteredKeyReader&& reader) noexcept;

        /**
         * Move assignment operator.
         *
         * @param reader The reader.
         **/
        FilteredKeyReader& operator=(FilteredKeyReader&& reader) noexcept;
    };

    /**
     * Creates a new filtered reader for the given topic and key filter. This helper method deduces the topic Key,
     * Value and UpdateTag types from the topic argument.
     *
     * @param topic The topic.
     * @param filter The key filter.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename KFC, typename K, typename V, typename UT>
    FilteredKeyReader<K, V, UT> makeFilteredKeyReader(
        const Topic<K, V, UT>& topic,
        const Filter<KFC>& filter,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return FilteredKeyReader<K, V, UT>(topic, filter, name, config);
    }

    /**
     * Creates a new filter reader for the given topic, key filter and sample filter. This helper method deduces
     * the topic Key, Value and UpdateTag types from the topic argument.
     *
     * @param topic The topic.
     * @param keyFilter The key filter.
     * @param sampleFilter The sample filter.
     * @param name The optional reader name.
     * @param config The optional reader configuration.
     */
    template<typename KFC, typename SFC, typename K, typename V, typename UT>
    FilteredKeyReader<K, V, UT> makeFilteredKeyReader(
        const Topic<K, V, UT>& topic,
        const Filter<KFC>& keyFilter,
        const Filter<SFC>& sampleFilter,
        const std::string& name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return FilteredKeyReader<K, V, UT>(topic, keyFilter, sampleFilter, name, config);
    }

    /**
     * The key writer to write the data element associated with a given key.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class SingleKeyWriter : public Writer<Key, Value, UpdateTag>
    {
    public:
        /**
         * Construct a new writer for the given key. The construction of the writer connects the writer to readers
         * with a matching key.
         *
         * @param topic The topic.
         * @param key The key of the data element to write.
         * @param name The optional writer name.
         * @param config The writer configuration.
         */
        SingleKeyWriter(
            const Topic<Key, Value, UpdateTag>& topic,
            const Key& key,
            const std::string& name = std::string(),
            const WriterConfig& config = WriterConfig()) noexcept;

        /**
         * Move constructor.
         *
         * @param writer The writer.
         **/
        SingleKeyWriter(SingleKeyWriter&& writer) noexcept;

        /**
         * Move assignment operator.
         *
         * @param writer The writer.
         **/
        SingleKeyWriter& operator=(SingleKeyWriter&& writer) noexcept;

        /**
         * Add the data element. This generates an {@link Add} sample with the
         * given value.
         *
         * @param value The data element value.
         */
        void add(const Value& value) noexcept;

        /**
         * Update the data element. This generates an {@link Update} sample with the
         * given value.
         *
         * @param value The data element value.
         */
        void update(const Value& value) noexcept;

        /**
         * Get a partial update generator function for the given partial update tag. When called, the returned
         * function generates a {@link PartialUpdate} sample with the given partial update value.
         *
         * The UpdateValue template parameter must match the UpdateValue type used to register the updater with
         * the {@link Topic::setUpdater} method.
         *
         * @param tag The partial update tag.
         */
        template<typename UpdateValue>
        std::function<void(const UpdateValue&)> partialUpdate(const UpdateTag& tag) noexcept;

        /**
         * Remove the data element. This generates a {@link Remove} sample.
         */
        void remove() noexcept;

    private:
        const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
    };

    /**
     * The key writer to write data elements associated with a given set of keys.
     *
     * @headerfile DataStorm/DataStorm.h
     */
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class MultiKeyWriter : public Writer<Key, Value, UpdateTag>
    {
    public:
        /**
         * Construct a new writer for the given keys. The construction of the writer connects the writer to
         * readers with matching keys. If an empty vector of keys is provided, the writer will connect to all the
         * available readers.
         *
         * @param topic The topic.
         * @param keys The keys.
         * @param name The optional writer name.
         * @param config The writer configuration.
         */
        MultiKeyWriter(
            const Topic<Key, Value, UpdateTag>& topic,
            const std::vector<Key>& keys,
            const std::string& name = std::string(),
            const WriterConfig& config = WriterConfig()) noexcept;

        /**
         * Transfers the given writer to this writer.
         *
         * @param writer The writer.
         **/
        MultiKeyWriter(MultiKeyWriter&& writer) noexcept;

        /**
         * Move assignment operator.
         *
         * @param writer The writer.
         **/
        MultiKeyWriter& operator=(MultiKeyWriter&& writer) noexcept;

        /**
         * Add the data element. This generates an {@link Add} sample with the given value.
         *
         * @param key The key
         * @param value The data element value.
         */
        void add(const Key& key, const Value& value) noexcept;

        /**
         * Update the data element. This generates an {@link Update} sample with the given value.
         *
         * @param key The key
         * @param value The data element value.
         */
        void update(const Key& key, const Value& value) noexcept;

        /**
         * Get a partial update generator function for the given partial update tag. When called, the returned
         * function generates a {@link PartialUpdate} sample with the given partial update value.
         *
         * The UpdateValue template parameter must match the UpdateValue type used to register the updater with
         * the {@link Topic::setUpdater} method.
         *
         * @param tag The partial update tag.
         */
        template<typename UpdateValue>
        std::function<void(const Key&, const UpdateValue&)> partialUpdate(const UpdateTag& tag) noexcept;

        /**
         * Remove the data element. This generates a {@link Remove} sample.

         * @param key The key
         */
        void remove(const Key& key) noexcept;

    private:
        const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
        const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
    };

    /**
     * Creates a key writer for the given topic and key. This helper method deduces the topic Key, Value and
     * UpdateTag types from the topic argument.
     *
     * @param topic The topic.
     * @param key The key.
     * @param name The optional writer name.
     * @param config The optional writer configuration.
     */
    template<typename K, typename V, typename UT>
    SingleKeyWriter<K, V, UT> makeSingleKeyWriter(
        const Topic<K, V, UT>& topic,
        const typename Topic<K, V, UT>::KeyType& key,
        const std::string& name = std::string(),
        const WriterConfig& config = WriterConfig()) noexcept
    {
        return SingleKeyWriter<K, V, UT>(topic, key, name, config);
    }

    /**
     * Creates a multi-key writer for the given topic and keys. This helper method deduces the topic Key, Value
     * and UpdateTag types from the topic argument.
     *
     * @param topic The topic.
     * @param keys The keys.
     * @param name The optional writer name.
     * @param config The optional writer configuration.
     */
    template<typename K, typename V, typename UT>
    MultiKeyWriter<K, V, UT> makeMultiKeyWriter(
        const Topic<K, V, UT>& topic,
        const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
        const std::string& name = std::string(),
        const WriterConfig& config = WriterConfig()) noexcept
    {
        return MultiKeyWriter<K, V, UT>(topic, keys, name, config);
    }

    /**
     * Creates an any-key writer for the given topic. This helper method deduces the topic Key, Value and
     * UpdateTag types from the topic argument.
     *
     * @param topic The topic.
     * @param name The optional writer name.
     * @param config The optional writer configuration.
     */
    template<typename K, typename V, typename UT>
    MultiKeyWriter<K, V, UT> makeAnyKeyWriter(
        const Topic<K, V, UT>& topic,
        const std::string& name = std::string(),
        const WriterConfig& config = WriterConfig()) noexcept
    {
        return MultiKeyWriter<K, V, UT>(topic, {}, name, config);
    }

}

//
// Public template based API implementation
//

namespace DataStorm
{

    //
    // Sample template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    SampleEvent Sample<Key, Value, UpdateTag>::getEvent() const noexcept
    {
        return _impl->event;
    }

    template<typename Key, typename Value, typename UpdateTag>
    const Key& Sample<Key, Value, UpdateTag>::getKey() const noexcept
    {
        return _impl->getKey();
    }

    template<typename Key, typename Value, typename UpdateTag>
    const Value& Sample<Key, Value, UpdateTag>::getValue() const noexcept
    {
        return _impl->getValue();
    }

    template<typename Key, typename Value, typename UpdateTag>
    UpdateTag Sample<Key, Value, UpdateTag>::getUpdateTag() const noexcept
    {
        return _impl->getTag();
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::chrono::time_point<std::chrono::system_clock> Sample<Key, Value, UpdateTag>::getTimeStamp() const noexcept
    {
        return _impl->timestamp;
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::string Sample<Key, Value, UpdateTag>::getOrigin() const noexcept
    {
        return _impl->origin;
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::string Sample<Key, Value, UpdateTag>::getSession() const noexcept
    {
        return _impl->session;
    }

    template<typename Key, typename Value, typename UpdateTag>
    Sample<Key, Value, UpdateTag>::Sample(const std::shared_ptr<DataStormI::Sample>& impl) noexcept
        : _impl(std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(impl))
    {
    }

    //
    // Reader template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    Reader<Key, Value, UpdateTag>::Reader(Reader<Key, Value, UpdateTag>&& reader) noexcept
        : _impl(std::move(reader._impl))
    {
    }

    template<typename Key, typename Value, typename UpdateTag> Reader<Key, Value, UpdateTag>::~Reader()
    {
        if (_impl)
        {
            _impl->destroy();
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    Reader<Key, Value, UpdateTag>& Reader<Key, Value, UpdateTag>::operator=(Reader&& reader) noexcept
    {
        if (_impl)
        {
            _impl->destroy();
        }
        _impl = std::move(reader._impl);
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Reader<Key, Value, UpdateTag>::hasWriters() const noexcept
    {
        return _impl->hasWriters();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::waitForWriters(unsigned int count) const
    {
        _impl->waitForWriters(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::waitForNoWriters() const
    {
        _impl->waitForWriters(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<std::string> Reader<Key, Value, UpdateTag>::getConnectedWriters() const noexcept
    {
        return _impl->getConnectedElements();
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Key> Reader<Key, Value, UpdateTag>::getConnectedKeys() const noexcept
    {
        std::vector<Key> keys;
        auto connectedKeys = _impl->getConnectedKeys();
        keys.reserve(connectedKeys.size());
        for (const auto& k : connectedKeys)
        {
            keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
        }
        return keys;
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Sample<Key, Value, UpdateTag>> Reader<Key, Value, UpdateTag>::getAllUnread() noexcept
    {
        auto unread = _impl->getAllUnread();
        std::vector<Sample<Key, Value, UpdateTag>> samples;
        samples.reserve(unread.size());
        for (auto sample : unread)
        {
            samples.emplace_back(sample);
        }
        return samples;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::waitForUnread(unsigned int count) const
    {
        _impl->waitForUnread(count);
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Reader<Key, Value, UpdateTag>::hasUnread() const noexcept
    {
        return _impl->hasUnread();
    }

    template<typename Key, typename Value, typename UpdateTag>
    Sample<Key, Value, UpdateTag> Reader<Key, Value, UpdateTag>::getNextUnread()
    {
        return Sample<Key, Value, UpdateTag>(_impl->getNextUnread());
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::onConnectedKeys(
        std::function<void(std::vector<Key>)> init,
        std::function<void(CallbackReason, Key)> update) noexcept
    {
        _impl->onConnectedKeys(init ? [init](std::vector<std::shared_ptr<DataStormI::Key>> connectedKeys)
    {
        std::vector<Key> keys;
        keys.reserve(connectedKeys.size());
        for(const auto& k : connectedKeys)
        {
            keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
        }
        init(std::move(keys));
    } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>(),
    update ? [update](CallbackReason action, std::shared_ptr<DataStormI::Key> key)
    {
        update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
    } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>());
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::onConnectedWriters(
        std::function<void(std::vector<std::string>)> init,
        std::function<void(CallbackReason, std::string)> update) noexcept
    {
        _impl->onConnectedElements(init, update);
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::onSamples(
        std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
        std::function<void(Sample<Key, Value, UpdateTag>)> update) noexcept
    {
        auto communicator = _impl->getCommunicator();
        _impl->onSamples(init ? [communicator, init](const std::vector<std::shared_ptr<DataStormI::Sample>>& samplesI)
    {
        std::vector<Sample<Key, Value, UpdateTag>> samples;
        samples.reserve(samplesI.size());
        for(const auto& s : samplesI)
        {
            samples.emplace_back(s);
        }
        init(std::move(samples));
    } : std::function<void(const std::vector<std::shared_ptr<DataStormI::Sample>>&)>(),
    update ? [communicator, update](const std::shared_ptr<DataStormI::Sample>& sampleI)
    {
        update(sampleI);
    } : std::function<void(const std::shared_ptr<DataStormI::Sample>&)>());
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyReader<Key, Value, UpdateTag>::SingleKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Key& key,
        const std::string& name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->create({topic._keyFactory->create(key)}, name, config))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename SFC>
    SingleKeyReader<Key, Value, UpdateTag>::SingleKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Key& key,
        const Filter<SFC>& sampleFilter,
        const std::string& name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->create(
              {topic._keyFactory->create(key)},
              name,
              config,
              sampleFilter.name,
              DataStormI::EncoderT<SFC>::encode(topic.getCommunicator(), sampleFilter.criteria)))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyReader<Key, Value, UpdateTag>::SingleKeyReader(SingleKeyReader<Key, Value, UpdateTag>&& reader) noexcept
        : Reader<Key, Value, UpdateTag>(std::move(reader))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyReader<Key, Value, UpdateTag>&
    SingleKeyReader<Key, Value, UpdateTag>::operator=(SingleKeyReader&& reader) noexcept
    {
        Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyReader<Key, Value, UpdateTag>::MultiKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const std::vector<Key>& keys,
        const std::string& name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->create(topic._keyFactory->create(keys), name, config))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename SFC>
    MultiKeyReader<Key, Value, UpdateTag>::MultiKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const std::vector<Key>& keys,
        const Filter<SFC>& sampleFilter,
        const std::string& name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->create(
              topic._keyFactory->create(keys),
              name,
              config,
              sampleFilter.name,
              Encoder<SFC>::encode(topic.getCommunicator(), sampleFilter.criteria)))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyReader<Key, Value, UpdateTag>::MultiKeyReader(MultiKeyReader<Key, Value, UpdateTag>&& reader) noexcept
        : Reader<Key, Value, UpdateTag>(std::move(reader))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyReader<Key, Value, UpdateTag>&
    MultiKeyReader<Key, Value, UpdateTag>::operator=(MultiKeyReader&& reader) noexcept
    {
        Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename KFC>
    FilteredKeyReader<Key, Value, UpdateTag>::FilteredKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Filter<KFC>& filter,
        const std::string& name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
              topic._keyFilterFactories->create(filter.name, filter.criteria),
              name,
              config))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename KFC, typename SFC>
    FilteredKeyReader<Key, Value, UpdateTag>::FilteredKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Filter<KFC>& keyFilter,
        const Filter<SFC>& sampleFilter,
        const std::string& name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
              topic._keyFilterFactories->create(keyFilter.name, keyFilter.criteria),
              name,
              config,
              sampleFilter.name,
              Encoder<SFC>::encode(topic.getCommunicator(), sampleFilter.criteria)))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    FilteredKeyReader<Key, Value, UpdateTag>::FilteredKeyReader(
        FilteredKeyReader<Key, Value, UpdateTag>&& reader) noexcept
        : Reader<Key, Value, UpdateTag>(std::move(reader))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    FilteredKeyReader<Key, Value, UpdateTag>&
    FilteredKeyReader<Key, Value, UpdateTag>::operator=(FilteredKeyReader&& reader) noexcept
    {
        Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
        return *this;
    }

    //
    // Writer template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    Writer<Key, Value, UpdateTag>::Writer(Writer&& writer) noexcept : _impl(std::move(writer._impl))
    {
    }

    template<typename Key, typename Value, typename UpdateTag> Writer<Key, Value, UpdateTag>::~Writer()
    {
        if (_impl)
        {
            _impl->destroy();
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    Writer<Key, Value, UpdateTag>& Writer<Key, Value, UpdateTag>::operator=(Writer&& writer) noexcept
    {
        if (_impl)
        {
            _impl->destroy();
        }
        _impl = std::move(writer._impl);
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Writer<Key, Value, UpdateTag>::hasReaders() const noexcept
    {
        return _impl->hasReaders();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::waitForReaders(unsigned int count) const
    {
        return _impl->waitForReaders(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::waitForNoReaders() const
    {
        return _impl->waitForReaders(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<std::string> Writer<Key, Value, UpdateTag>::getConnectedReaders() const noexcept
    {
        return _impl->getConnectedElements();
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Key> Writer<Key, Value, UpdateTag>::getConnectedKeys() const noexcept
    {
        std::vector<Key> keys;
        auto connectedKeys = _impl->getConnectedKeys();
        keys.reserve(connectedKeys.size());
        for (const auto& k : connectedKeys)
        {
            keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
        }
        return keys;
    }

    template<typename Key, typename Value, typename UpdateTag>
    Sample<Key, Value, UpdateTag> Writer<Key, Value, UpdateTag>::getLast()
    {
        auto sample = _impl->getLast();
        if (!sample)
        {
            throw std::logic_error("no sample");
        }
        return Sample<Key, Value, UpdateTag>(sample);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Sample<Key, Value, UpdateTag>> Writer<Key, Value, UpdateTag>::getAll() noexcept
    {
        auto all = _impl->getAll();
        std::vector<Sample<Key, Value, UpdateTag>> samples;
        samples.reserve(all.size());
        for (auto sample : all)
        {
            samples.emplace_back(sample);
        }
        return samples;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::onConnectedKeys(
        std::function<void(std::vector<Key>)> init,
        std::function<void(CallbackReason, Key)> update) noexcept
    {
        _impl->onConnectedKeys(init ? [init](std::vector<std::shared_ptr<DataStormI::Key>> connectedKeys)
    {
        std::vector<Key> keys;
        keys.reserve(connectedKeys.size());
        for(const auto& k : connectedKeys)
        {
            keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
        }
        init(std::move(keys));
    } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>(),
    update ? [update](CallbackReason action, std::shared_ptr<DataStormI::Key> key)
    {
        update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
    } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>());
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::onConnectedReaders(
        std::function<void(std::vector<std::string>)> init,
        std::function<void(CallbackReason, std::string)> update) noexcept
    {
        _impl->onConnectedElements(init, update);
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyWriter<Key, Value, UpdateTag>::SingleKeyWriter(
        const Topic<Key, Value, UpdateTag>& topic,
        const Key& key,
        const std::string& name,
        const WriterConfig& config) noexcept
        : Writer<Key, Value, UpdateTag>(topic.getWriter()->create({topic._keyFactory->create(key)}, name, config)),
          _tagFactory(topic._tagFactory)
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyWriter<Key, Value, UpdateTag>::SingleKeyWriter(SingleKeyWriter<Key, Value, UpdateTag>&& writer) noexcept
        : Writer<Key, Value, UpdateTag>(std::move(writer)),
          _tagFactory(std::move(writer._tagFactory))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyWriter<Key, Value, UpdateTag>&
    SingleKeyWriter<Key, Value, UpdateTag>::operator=(SingleKeyWriter&& writer) noexcept
    {
        Writer<Key, Value, UpdateTag>::operator=(std::move(writer));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void SingleKeyWriter<Key, Value, UpdateTag>::add(const Value& value) noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            nullptr,
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void SingleKeyWriter<Key, Value, UpdateTag>::update(const Value& value) noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            nullptr,
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename UpdateValue>
    std::function<void(const UpdateValue&)>
    SingleKeyWriter<Key, Value, UpdateTag>::partialUpdate(const UpdateTag& tag) noexcept
    {
        auto impl = Writer<Key, Value, UpdateTag>::_impl;
        auto updateTag = _tagFactory->create(tag);
        return [impl, updateTag](const UpdateValue& value)
        {
            auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
            impl->publish(nullptr, std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
        };
    }

    template<typename Key, typename Value, typename UpdateTag>
    void SingleKeyWriter<Key, Value, UpdateTag>::remove() noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            nullptr,
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyWriter<Key, Value, UpdateTag>::MultiKeyWriter(
        const Topic<Key, Value, UpdateTag>& topic,
        const std::vector<Key>& keys,
        const std::string& name,
        const WriterConfig& config) noexcept
        : Writer<Key, Value, UpdateTag>(topic.getWriter()->create(topic._keyFactory->create(keys), name, config)),
          _keyFactory(topic._keyFactory),
          _tagFactory(topic._tagFactory)
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyWriter<Key, Value, UpdateTag>::MultiKeyWriter(MultiKeyWriter<Key, Value, UpdateTag>&& writer) noexcept
        : Writer<Key, Value, UpdateTag>(std::move(writer)),
          _keyFactory(std::move(writer._keyFactory)),
          _tagFactory(std::move(writer._tagFactory))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyWriter<Key, Value, UpdateTag>&
    MultiKeyWriter<Key, Value, UpdateTag>::operator=(MultiKeyWriter&& writer) noexcept
    {
        Writer<Key, Value, UpdateTag>::operator=(std::move(writer));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void MultiKeyWriter<Key, Value, UpdateTag>::add(const Key& key, const Value& value) noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            _keyFactory->create(key),
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void MultiKeyWriter<Key, Value, UpdateTag>::update(const Key& key, const Value& value) noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            _keyFactory->create(key),
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename UpdateValue>
    std::function<void(const Key&, const UpdateValue&)>
    MultiKeyWriter<Key, Value, UpdateTag>::partialUpdate(const UpdateTag& tag) noexcept
    {
        auto impl = Writer<Key, Value, UpdateTag>::_impl;
        auto updateTag = _tagFactory->create(tag);
        auto keyFactory = _keyFactory;
        return [impl, updateTag, keyFactory](const Key& key, const UpdateValue& value)
        {
            auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
            impl->publish(
                keyFactory->create(key),
                std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
        };
    }

    template<typename Key, typename Value, typename UpdateTag>
    void MultiKeyWriter<Key, Value, UpdateTag>::remove(const Key& key) noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            _keyFactory->create(key),
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
    }

    /** @private */
    template<typename Value>
    std::function<std::function<bool(const Value&)>(const std::string&)> makeRegexFilter() noexcept
    {
        return [](const std::string& criteria)
        {
            std::regex expr(criteria);
            return [expr](const Value& value)
            {
                std::ostringstream os;
                os << value;
                return std::regex_match(os.str(), expr);
            };
        };
    }

    /** @private */
    template<typename Key, typename Value, typename UpdateTag>
    std::function<std::function<bool(const Sample<Key, Value, UpdateTag>&)>(const std::vector<SampleEvent>&)>
    makeSampleEventFilter(const Topic<Key, Value, UpdateTag>&) noexcept
    {
        return [](const std::vector<SampleEvent>& criteria)
        {
            return [criteria](const Sample<Key, Value, UpdateTag>& sample)
            { return std::find(criteria.begin(), criteria.end(), sample.getEvent()) != criteria.end(); };
        };
    }

    /** @private */
    template<typename T, typename V, typename Enabler = void> struct RegexFilter
    {
        template<typename F> static void add(F) {}
    };

    /** @private */
    template<typename T, typename V>
    struct RegexFilter<T, V, typename std::enable_if<DataStormI::is_streamable<V>::value>::type>
    {
        template<typename F> static void add(F factory)
        {
            factory->set("_regex", makeRegexFilter<T>()); // Only set the _regex filter if the value is streamable
        }
    };

    //
    // Topic template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    Topic<Key, Value, UpdateTag>::Topic(const Node& node, const std::string& name) noexcept
        : _name(name),
          _topicFactory(node._factory),
          _keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
          _tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
          _keyFilterFactories(DataStormI::FilterManagerT<DataStormI::KeyT<Key>>::create()),
          _sampleFilterFactories(DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>::create())
    {
        RegexFilter<Key, Key>::add(_keyFilterFactories);
        RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
        _sampleFilterFactories->set("_event", makeSampleEventFilter(*this));
    }

    template<typename Key, typename Value, typename UpdateTag> Topic<Key, Value, UpdateTag>::~Topic()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (_reader)
        {
            _reader->destroy();
        }
        if (_writer)
        {
            _writer->destroy();
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    Topic<Key, Value, UpdateTag>& Topic<Key, Value, UpdateTag>::operator=(Topic<Key, Value, UpdateTag>&& topic) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (_reader)
        {
            _reader->destroy();
        }
        if (_writer)
        {
            _writer->destroy();
        }
        _name = std::move(topic._name);
        _topicFactory = std::move(topic._topicFactory);
        _keyFactory = std::move(topic._keyFactory);
        _tagFactory = std::move(topic._tagFactory);
        _keyFilterFactories = std::move(topic._keyFilterFactories);
        _sampleFilterFactories = std::move(topic._sampleFilterFactories);
        _reader = std::move(topic._reader);
        _writer = std::move(topic._writer);
        _updaters = std::move(topic._updaters);
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Topic<Key, Value, UpdateTag>::hasWriters() const noexcept
    {
        return getReader()->hasWriters();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForWriters(unsigned int count) const
    {
        getReader()->waitForWriters(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForNoWriters() const
    {
        getReader()->waitForWriters(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::setReaderDefaultConfig(const ReaderConfig& config) noexcept
    {
        getReader()->setDefaultConfig(config);
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Topic<Key, Value, UpdateTag>::hasReaders() const noexcept
    {
        return getWriter()->hasReaders();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForReaders(unsigned int count) const
    {
        getWriter()->waitForReaders(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForNoReaders() const
    {
        getWriter()->waitForReaders(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::setWriterDefaultConfig(const WriterConfig& config) noexcept
    {
        getWriter()->setDefaultConfig(config);
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename UpdateValue>
    void Topic<Key, Value, UpdateTag>::setUpdater(
        const UpdateTag& tag,
        std::function<void(Value&, UpdateValue)> updater) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        auto tagI = _tagFactory->create(std::move(tag));
        auto updaterImpl = updater ? [updater](const std::shared_ptr<DataStormI::Sample>& previous,
                                           const std::shared_ptr<DataStormI::Sample>& next,
                                           const Ice::CommunicatorPtr& communicator)
    {
        Value value;
        if(previous)
        {
            value = Cloner<Value>::clone(
                std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
        }
        updater(value, Decoder<UpdateValue>::decode(communicator, next->getEncodedValue()));
        std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(next)->setValue(std::move(value));
    } : std::function<void(const std::shared_ptr<DataStormI::Sample>&,
                           const std::shared_ptr<DataStormI::Sample>&,
                           const Ice::CommunicatorPtr&)>();

        if (_reader && !_writer)
        {
            _reader->setUpdater(tagI, updaterImpl);
        }
        else if (_writer && !_reader)
        {
            _writer->setUpdater(tagI, updaterImpl);
        }
        else if (_reader && _writer)
        {
            _reader->setUpdater(tagI, updaterImpl);
            _writer->setUpdater(tagI, updaterImpl);
        }
        else
        {
            _updaters[tagI] = updaterImpl;
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename Criteria>
    void Topic<Key, Value, UpdateTag>::setKeyFilter(
        const std::string& name,
        std::function<std::function<bool(const Key&)>(const Criteria&)> factory) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _keyFilterFactories->set(name, factory);
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename Criteria>
    void Topic<Key, Value, UpdateTag>::setSampleFilter(
        const std::string& name,
        std::function<std::function<bool(const Sample<Key, Value, UpdateTag>&)>(const Criteria&)> factory) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _sampleFilterFactories->set(name, factory);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::shared_ptr<DataStormI::TopicReader> Topic<Key, Value, UpdateTag>::getReader() const
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (!_reader)
        {
            auto sampleFactory = std::make_shared<DataStormI::SampleFactoryT<Key, Value, UpdateTag>>();
            _reader = _topicFactory->createTopicReader(
                _name,
                _keyFactory,
                _tagFactory,
                std::move(sampleFactory),
                _keyFilterFactories,
                _sampleFilterFactories);
            _reader->setUpdaters(_writer ? _writer->getUpdaters() : _updaters);
            _updaters.clear();
        }
        return _reader;
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::shared_ptr<DataStormI::TopicWriter> Topic<Key, Value, UpdateTag>::getWriter() const
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (!_writer)
        {
            _writer = _topicFactory->createTopicWriter(
                _name,
                _keyFactory,
                _tagFactory,
                nullptr,
                _keyFilterFactories,
                _sampleFilterFactories);
            _writer->setUpdaters(_reader ? _reader->getUpdaters() : _updaters);
            _updaters.clear();
        }
        return _writer;
    }

    template<typename Key, typename Value, typename UpdateTag>
    Ice::CommunicatorPtr Topic<Key, Value, UpdateTag>::getCommunicator() const noexcept
    {
        return _topicFactory->getCommunicator();
    }

}
#endif
